package co.recloud.ariadne.server;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;

import co.recloud.ariadne.model.Host;
import co.recloud.ariadne.model.logical.Transaction;
import co.recloud.ariadne.model.plan.Operator;
import co.recloud.ariadne.request.CommitGossipRequest;
import co.recloud.ariadne.request.DataRequest;
import co.recloud.ariadne.request.GridUpdateRequest;
import co.recloud.ariadne.request.HostRequest;
import co.recloud.ariadne.request.KeyMigrateRequest;
import co.recloud.ariadne.request.KeyUpdateGossipRequest;
import co.recloud.ariadne.request.Request;
import co.recloud.ariadne.request.TransactionRequest;
import co.recloud.ariadne.request.TransactionStartGossipRequest;
import co.recloud.ariadne.request.TransitionRequest;
import co.recloud.ariadne.request.WorkRequest;
import co.recloud.ariadne.response.DataResponse;
import co.recloud.ariadne.response.HostResponse;
import co.recloud.ariadne.response.Response;
import co.recloud.ariadne.store.HostTable;
import co.recloud.ariadne.store.MemoryCache;
import co.recloud.ariadne.store.MemoryStore;
import co.recloud.ariadne.store.TransactionCache;
import co.recloud.ariadne.thread.Main;

public class DataServer {
	private static SortedSet<Long> activeStartTimes = new TreeSet<Long>();
	private static SortedSet<Long> committedStartTimes = new TreeSet<Long>();

	private static synchronized void addActiveStartTime(Long startTime) {
		if (!committedStartTimes.contains(startTime) && !activeStartTimes.contains(startTime)) {
			activeStartTimes.add(startTime);
		}
	}

	private static synchronized void clearStartTime(Long startTime) {
		if (activeStartTimes.contains(startTime)) {
			activeStartTimes.remove(startTime);
		}
		if(!committedStartTimes.contains(startTime)) {
			committedStartTimes.add(startTime);
		}
	}
	
	public static synchronized SortedSet<Long> getActiveStartTimes() {
		SortedSet<Long> result = new TreeSet<Long>();
		result.addAll(activeStartTimes);
		return result;
	}
	
	public static synchronized void condenseCommittedTimes() {
		committedStartTimes.clear();
	}
	
	public Response serve(Request request) {
		Response response = null;
		if (request instanceof DataRequest) {
			DataRequest dataRequest = (DataRequest) request;
			if (dataRequest.getType() == DataRequest.GET) {
				response = serveGet(dataRequest);
			} else if (dataRequest.getType() == DataRequest.PUT) {
				response = servePut(dataRequest);
			}
		} else if (request instanceof HostRequest) {
			response = serveHostRequest(request);
		} else if (request instanceof GridUpdateRequest) {
			response = serveGridUpdateRequest(request);
		} else if (request instanceof TransitionRequest) {
			response = serveTransitionRequest(request);
		} else if (request instanceof WorkRequest) {
			response = serveWorkRequest(request);
		} else if (request instanceof TransactionStartGossipRequest) {
			response = serveTransactionStartGossipRequest(request);
		} else if (request instanceof CommitGossipRequest) {
			response = serveCommitGossipRequest(request);
		} else if (request instanceof TransactionRequest) {
			response = serveTransactionRequest(request);
		} else if (request instanceof KeyUpdateGossipRequest) {
			response = serveKeyUpdateGossipRequest(request);
		} else if (request instanceof KeyMigrateRequest) {
			response = serveKeyMigrateRequest(request);
		}
		return response;
	}

	private Response serveKeyMigrateRequest(Request request) {
		Response response = null;
		try {
			response = new Response();
			KeyMigrateRequest keyReq = (KeyMigrateRequest) request;
			HostTable ht = HostTable.getInstance();
			MemoryStore store = MemoryStore.getStore(ht.getLocalhost().getLocation());
			store.putAllVersions(keyReq.getPath(), keyReq.getVersions());
		} catch (Exception e) {
			response = null;
		}
		return response;
	}

	private Response serveKeyUpdateGossipRequest(Request request) {
		Response response = new Response();
		KeyUpdateGossipRequest updateGossip = (KeyUpdateGossipRequest) request;
		MemoryCache memCache = MemoryCache.getInstance();
		memCache.invalidate(updateGossip.getPath(), updateGossip.getUpdatedTime());
		updateGossip.gossip();
		return response;
	}

	private Response serveTransactionStartGossipRequest(Request request) {
		Response response = new Response();
		TransactionStartGossipRequest startReq = (TransactionStartGossipRequest) request;
		addActiveStartTime(startReq.getTransaction().getStartTime());
		startReq.gossip();
		return response;
	}

	private Response serveCommitGossipRequest(Request request) {
		Response response = new Response();
		CommitGossipRequest commitReq = (CommitGossipRequest) request;
		clearStartTime(commitReq.getTransaction().getStartTime());
		commitReq.gossip();
		return response;
	}

	private Response serveTransactionRequest(Request request) {
		TransactionRequest txnRequest = (TransactionRequest) request;
		Transaction txn = txnRequest.getTransaction();
		if(txn.getState() == Transaction.COMMITTED) {
			TransactionCache cache = TransactionCache.getInstance();
			cache.writeBack(txn);
			cache.freeTransaction(txn);
		} else if (txn.getState() == Transaction.ABORTED) {
			TransactionCache cache = TransactionCache.getInstance();
			cache.freeTransaction(txn);
		}
		Response response = new Response();
		return response;
	}

	private Response serveWorkRequest(Request request) {
		WorkRequest workReq = (WorkRequest) request;
		Operator operator = workReq.getOperator();
		operator.execute(null, 0);
		Response response = new Response();
		return response;
	}

	private Response serveTransitionRequest(Request request) {
		TransitionRequest tReq = (TransitionRequest) request;
		Response response = null;
		if (Main.getCurrentStatus() == Main.STATUS_STANDBY
				&& tReq.getNewStatus() == Main.STATUS_SERVICE_PENDING) {
			Main.setCurrentStatus(Main.STATUS_SERVICE_PENDING);
			System.out.println("SERVICE PENDING");
			System.out.println(tReq.getLocation());
			response = new Response();
		}
		if (Main.getCurrentStatus() == Main.STATUS_SERVICE_PENDING
				&& tReq.getNewStatus() == Main.STATUS_SERVICE) {
			Main.setCurrentStatus(Main.STATUS_SERVICE);
			System.out.println("SERVICE");
			response = new Response();
		}
		return response;
	}

	private Response serveHostRequest(Request request) {
		HostResponse response = new HostResponse();
		HostRequest hostRequest = (HostRequest) request;
		HostTable ht = HostTable.getInstance();
		Host addedHost = hostRequest.getAddedHost();
		if (!ht.getTokenRing().containsKey(addedHost.getToken())) {
			ht.merge(hostRequest.getHostTable());
			if (!hostRequest.isSeeded()) {
				Set<Host> hostSet = new HashSet<Host>();
				hostSet.addAll(ht.getTokenRing().values());
				hostSet.remove(ht.getLocalhost());
				hostSet.remove(addedHost);
				for (Host coHost : hostSet) {
					HostRequest seedRequest = new HostRequest();
					seedRequest.setAddedHost(addedHost);
					seedRequest.setReturnHostTable(false);
					seedRequest.setHostTable(ht);
					seedRequest.setSeeded(true);
					seedRequest.setTarget(coHost);
					seedRequest.sendBlocking(coHost);
				}
			}
		}
		if (hostRequest.isReturnHostTable()) {
			System.out.println(ht.getTokenRing());
			response.setHostTable(ht);
		}
		return response;
	}

	private Response serveGridUpdateRequest(Request request) {
		Response response = new Response();
		GridUpdateRequest gridReq = (GridUpdateRequest) request;
		if (gridReq.getHostTable() != null) {
			HostTable ht = HostTable.getInstance();
			ht.merge(gridReq.getHostTable());
			System.out.println("locations are now " + ht.getLocationToHost());
		}
		return response;
	}

	private DataResponse servePut(DataRequest dataRequest) {
		DataResponse response = new DataResponse();
		MemoryStore store = MemoryStore.getStore(dataRequest.getPrimary());
		Map<String, Map<String, Map<String, Map<String, Object>>>> data = dataRequest
				.getData();
		for (String schema : data.keySet()) {
			for (String columnFamily : data.get(schema).keySet()) {
				for (String key : data.get(schema).get(columnFamily).keySet()) {
					Map<String, Object> oldRow = store.get(schema, columnFamily, key, dataRequest.getTransaction().getCommitTime());
					if(oldRow != null) {
					store.put(schema, columnFamily, key, dataRequest.getTransaction().getCommitTime(),
							dataRequest.getTransaction().getId(),
							oldRow);
					}
					store.put(schema, columnFamily, key, dataRequest.getTransaction().getCommitTime(),
							dataRequest.getTransaction().getId(),
							data.get(schema).get(columnFamily).get(key));
				}
			}
		}
		return response;
	}

	private DataResponse serveGet(DataRequest dataRequest) {
		DataResponse response = null;
		MemoryStore store = MemoryStore.getStore(dataRequest.getPrimary());
		if (store != null) {
			Map<String, Object> row = new HashMap<String, Object>();
			Long rowTime = store.getTime(dataRequest.getSchema(),
					dataRequest.getColumnFamily(), dataRequest.getKey(),
					dataRequest.getTransaction().getStartTime());
			response = new DataResponse();
			if(rowTime != null) {
				Map<String, Object> storedRow = store.get(dataRequest.getSchema(),
						dataRequest.getColumnFamily(), dataRequest.getKey(),
						rowTime);
				if (storedRow != null) {
					if(dataRequest.getColumns() != null) {
						for (String column : dataRequest.getColumns()) {
							row.put(column, storedRow.get(column));
						}     
					} else {
						row.putAll(storedRow);
					}
					response.setRowTime(rowTime);
					response.setData(row);
				}
			} else {
				response.setTime(0L);
				response.setData(null);
			}
		}
		return response;
	}
}
